package com.csw.shuanfa.CodeImprove.CompleteFutureLinkedBlockingQueue.RequestMergeLinkedBlockingQueue;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Service
public class CommodityService {
    @Autowired
    QueryServiceRemoteCall queryServiceRemoteCall;
    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时，容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样，ArrayBlockingQueue实现的队列中的锁是没有分离的，即添加操作和移除操作采用的同一个ReenterLock锁，而LinkedBlockingQueue实现的队列中的锁是分离的，其添加采用的是putLock，移除采用的则是takeLock，这样能大大提高队列的吞吐量，也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据，以此来提高整个队列的并发性能。
     */
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();   //这里因为是测试,所以使用的是无界队列

    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是周期性执行10毫秒执行一次
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            //合并了" + size + "个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                list.add(queue.poll());
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<String> commodityCodes = new ArrayList<>();
            for (Request request : list) {
                commodityCodes.add(request.code);
            }
            //将参数传入service处理
            Map<String, HashMap<String, Object>> response = queryServiceRemoteCall.queryCommodityByCodeBatch(commodityCodes);
            //将处理结果返回各自的请求
            for (Request request : list) {
                Map<String, Object> result = response.get(request.code);
                request.completableFuture.complete(result);    //completableFuture.complete方法完成赋值,这一步执行完毕,阻塞的请求可以继续执行了
            }
        }, 0, 10, TimeUnit.MILLISECONDS);
    }

    public Map<String, Object> queryCommodity(String code) throws ExecutionException, InterruptedException {
        Request request = new Request();
        request.code = code;
        CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
        request.completableFuture = future;
        //将对象传入队列
        queue.add(request);
        //如果这时候没完成赋值,那么就会阻塞,知道能够拿到值
        return future.get();
    }

    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */
    class Request {
        String code;
        CompletableFuture completableFuture;
    }
}
